package kafka

import (
	"gitee.com/xfrm/middleware/xmq/pub"
	"gitee.com/xfrm/middleware/xstat/xmetric/xprometheus"
	"gitee.com/xfrm/middleware/xtrace"
	"github.com/segmentio/kafka-go"
)

func statKafkaReader(topic string, stats *kafka.ReaderStats) {
	command := "reader"
	pub.GetMessageNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Messages))
	pub.GetFetchNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Fetches))
	pub.GetTimeoutsNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Timeouts))
	pub.GetBytesNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Bytes))
	pub.GetErrorNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Errors))
}

func statKafkaWriter(topic string, stats *kafka.WriterStats) {
	command := "writer"
	pub.GetMessageNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Messages))
	pub.GetWriteNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Writes))
	pub.GetBytesNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Bytes))
	pub.GetErrorNumberMetric().With("topic", topic, "command", command,
		xprometheus.LabelMessageBusType, pub.TraceMessageBusTypeKafka, xprometheus.LabelCallerService, xtrace.ServiceName()).Add(float64(stats.Errors))
}
